package com.starsky.common.kafka.config;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ContainerProperties;

import java.util.HashMap;
import java.util.Map;

@Configuration
@EnableKafka
public class KafkaConsumerConfig {

    @Value("${kafka.consumer.servers}")
    private String servers;
    @Value("${kafka.consumer.enable.auto.commit}")
    private boolean enableAutoCommit;
    @Value("${kafka.consumer.session.timeout}")
    private String sessionTimeout;
    @Value("${kafka.consumer.auto.commit.interval}")
    private String autoCommitInterval;
    @Value("${kafka.consumer.group.id}")
    private String groupId;
    @Value("${kafka.consumer.auto.offset.reset}")
    private String autoOffsetReset;
    @Value("${kafka.consumer.concurrency}")
    private int concurrency;

    /**
     * AckMode针对ENABLE_AUTO_COMMIT_CONFIG=false时生效，有以下几种：
     * <p>
     * RECORD
     * 每处理一条commit一次
     * <p>
     * BATCH(默认)
     * 每次poll的时候批量提交一次，频率取决于每次poll的调用频率
     * <p>
     * TIME
     * 每次间隔ackTime的时间去commit(跟auto commit interval有什么区别呢？)
     * <p>
     * COUNT
     * 累积达到ackCount次的ack去commit
     * <p>
     * COUNT_TIME
     * ackTime或ackCount哪个条件先满足，就commit
     * <p>
     * MANUAL
     * listener负责ack，但是背后也是批量上去
     * <p>
     * MANUAL_IMMEDIATE
     * listner负责ack，每调用一次，就立即commit
     */
    @Bean
    public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(new DefaultKafkaConsumerFactory(consumerConfigs()));
        factory.setConcurrency(concurrency);
        // 开启批量消费
        factory.setBatchListener(true);
        //最大的poll数据间隔，如果超过这个间隔没有发起pool请求，但heartbeat仍旧在发，就认为该consumer处于 livelock状态。
        // 就会将该consumer退出consumer group
        factory.getContainerProperties().setPollTimeout(1500);
        //开启手动提交ack
        factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
        return factory;
    }

    public Map<String, Object> consumerConfigs() {
        // 服务器地址、分组配置、自动提交-false、自动重设offset、序列化配置
        Map<String, Object> propsMap = new HashMap<>();
        // 指定kafka server的地址，集群配多个，中间，逗号隔开
        propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
        // enable.auto.commit:true --> 设置自动提交offset
        propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);
        // #如果'enable.auto.commit'为true，则消费者偏移自动提交给Kafka的频率（以毫秒为单位），默认值为5000。
        propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, autoCommitInterval);
        //设置session回话超时时间
        propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeout);
        //指定默认消费者group id --> 由于在kafka中，同一组中的consumer不会读取到同一个消息，依靠groud.id设置组名
        propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        //smallest和largest才有效，如果smallest重新0开始读取，如果是largest从logfile的offset读取。一般情况下我们都是设置smallest
        propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
        // 指定序列化类
        propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);

        return propsMap;
    }
}
